package src

import (
	"encoding/json"
	"fmt"
	"math"
	"os"
	"strconv"
	"strings"

	"gitee.com/haifengat/goctp/v2"
	"github.com/sirupsen/logrus"
)

func startQuote() {
	md = goctp.NewQuotePro()
	logrus.Infof("quote:%s broker:%s user:%s", quoteFront, brokerID, investorID)

	md.OnFrontDisconnected = func(nReason int) {
		logrus.Info("quote disconected ", nReason)
		if nReason != 0 {
			releaseQuote()
		}
	}

	md.OnRtnTick = func(pDepthMarketData *goctp.CThostFtdcDepthMarketDataField) { onTick(*pDepthMarketData) } // 转为 struct 以释放指针

	logInfo, info := md.Start(goctp.LoginConfig{
		Front:    quoteFront,
		Broker:   brokerID,
		UserID:   investorID,
		Password: password,
		AppID:    appID,
		AuthCode: authCode,
	})
	if info.ErrorID != 0 {
		logrus.Errorf("%+v", info)
		releaseQuote()
	} else {
		onMdLogin(&logInfo)
	}
}

func releaseQuote() {
	md.Release()
}

func onTick(tick goctp.CThostFtdcDepthMarketDataField) {
	lastPrice := float64(tick.LastPrice)
	if lastPrice >= math.MaxFloat32 {
		return
	}
	inst := tick.InstrumentID.String()
	// 合约状态过滤 == 会造成入库延时
	if status, loaded := mapInstrumentStatus.Load(inst); !loaded || status.(goctp.TThostFtdcInstrumentStatusType) != goctp.THOST_FTDC_IS_Continous {
		return
	}
	// 取tick的分钟构造当前分钟时间
	var action = tradingDay
	// 夜盘
	hour, _ := strconv.Atoi(tick.UpdateTime.String()[0:2])
	if hour <= 3 {
		action = actionDayNext
	} else if hour >= 20 {
		action = actionDay
	}
	cntTicks++
	minDateTime := fmt.Sprintf("%s-%s-%s %s:00", action[0:4], action[4:6], action[6:], tick.UpdateTime[0:5])
	if obj, loaded := instLastMin.LoadOrStore(inst, &Bar{DateTime: minDateTime, InstrumentID: inst, PreVol: int(int32(tick.Volume)), OpenInterest: float64(tick.OpenInterest), Ticks: 1, TradingDay: TradingDayType(tradingDay), Open: lastPrice, High: lastPrice, Low: lastPrice, Close: lastPrice, Volume: 0}); loaded {
		var bar = obj.(*Bar)
		minDiff := strings.Compare(minDateTime, bar.DateTime)
		// 不处理 <0 的情况
		if minDiff < 0 {
			return
		}
		if minDiff > 0 { // 新的分钟
			preVol := bar.PreVol + bar.Volume
			bar = &Bar{
				DateTime:     minDateTime,
				InstrumentID: inst,
				PreVol:       preVol,
				OpenInterest: float64(tick.OpenInterest),
				Ticks:        1,
				TradingDay:   TradingDayType(tradingDay),
				Open:         lastPrice,
				High:         lastPrice,
				Low:          lastPrice,
				Close:        lastPrice,
				Volume:       int(int32(tick.Volume)) - preVol,
			}
		} else { // 分钟数据更新
			const E = 0.000001
			if lastPrice-bar.High > E {
				bar.High = lastPrice
			}
			if lastPrice-bar.Low < E {
				bar.Low = lastPrice
			}
			bar.Close = lastPrice
			bar.Volume = int(int32(tick.Volume)) - bar.PreVol
			bar.OpenInterest = float64(tick.OpenInterest)
			bar.Fix()
			// 此时间是否 push过
			if rdb != nil && bar.Volume > 0 { // 过滤成交量==0的数据
				bar.Ticks++
				// 当前分钟未被记录
				if bar.Ticks == 3 { // 控制分钟最小tick数量；避免盘歇的数据
					jsonStr, _ := json.Marshal(&bar)
					err := rdb.RPush(ctxRedis, inst, jsonStr).Err()
					if err != nil {
						logrus.Errorf("redis rpush error: %s %v", inst, err)
					}
					// 发布分钟数据
					rdb.Publish(ctxRedis, "md."+inst, jsonStr)
				} else if bar.Ticks > 3 {
					jsonStr, _ := json.Marshal(&bar)
					err := rdb.LSet(ctxRedis, inst, -1, jsonStr).Err()
					if err != nil {
						logrus.Errorf("redis lset error: %s %v", inst, err)
					}
					// 发布分钟数据
					rdb.Publish(ctxRedis, "md."+inst, jsonStr)
				}
				sseBar(bar) // 推送
				sseTick(&tick)
			}
		}
		instLastMin.Store(inst, bar) // 更新 bar
	}
}

func onMdLogin(login *goctp.CThostFtdcRspUserLoginField) {
	logrus.Infof("quote login: %+v", *login)
	// q.ReqSubscript("au2012")
	for inst, v := range trd.Instruments {
		// 取最新K线数据
		if jsonMin, err := rdb.LRange(ctxRedis, inst, -1, -1).Result(); err == nil && len(jsonMin) > 0 {
			var bar = new(Bar)
			if json.Unmarshal([]byte(jsonMin[0]), bar) == nil {
				instLastMin.Store(inst, bar)
			}
		}
		// 更新合约状态
		pid := v.ProductID.String()
		if len(pid) > 0 {
			if status, loaded := instrumentStatus[pid]; loaded {
				mapInstrumentStatus.Store(inst, status)
			}
		}
	}

	// 订阅行情
	var insts = make([]string, 0)
	ps := len(products)
	success := 0
	failure := 0
	md.OnRspSubMarketData = func(pSpecificInstrument *goctp.CThostFtdcSpecificInstrumentField, pRspInfo *goctp.CThostFtdcRspInfoField, nRequestID int, bIsLast bool) {
		if pRspInfo.ErrorID != 0 {
			logrus.Infof("%s: %+v", pSpecificInstrument.InstrumentID.String(), pRspInfo)
			failure++
		} else {
			success++
			// logrus.Info(pSpecificInstrument.InstrumentID.String())
		}
		if bIsLast && success+failure > len(insts)-30 {
			logrus.Infof("行情订阅, 成功: %d, 失败: %d", success, failure)
			if failure > 0 {
				os.Exit(-2)
			}
		}
	}
	for k, v := range trd.Instruments {
		pid := v.ProductID.String()
		if len(pid) == 0 { // 过滤 非正常合约
			continue
		}
		if ps > 0 { // 如果设置了接收的品种
			// 大写比较
			p := strings.ToUpper(pid)
			if len(p) == 0 { // BUK等组合合约productid为""
				continue
			}
			idx := -1
			for j, v := range products {
				if strings.Compare(strings.ToUpper(v), p) == 0 {
					idx = j
					break
				}
			}
			if idx == -1 { // 不在列表里
				continue
			}
		}
		insts = append(insts, k)
	}
	if len(insts) > 0 {
		md.ReqSubscript(insts...)
	}
}
